-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enforce sorting handle fetchable operators, add option to repartition based on row count estimates #11875
Conversation
@@ -3019,11 +3019,11 @@ mod tests { | |||
|
|||
assert_batches_sorted_eq!( | |||
[ | |||
"+-----+-----+----+-------+", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Result of this test changes with this PR. I have analyzed the change, previously this tes was generating the following plan:
"ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, CAST(c2@1 AS Int8) + c3@2 as sum]",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" SortExec: expr=[c1@0 ASC,c2@1 ASC,c3@2 ASC], preserve_partitioning=[false]",
" GlobalLimitExec: skip=0, fetch=1",
" CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192, fetch=1",
" FilterExec: c2@1 = 3 AND c1@0 = a",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[<PATH>]]}, projection=[c1, c2, c3], has_header=true",
After the changes in this PR, following plan is generated
"ProjectionExec: expr=[c1@0 as one, c2@1 as two, c3@2 as c3, CAST(c2@1 AS Int8) + c3@2 as total]",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" SortExec: TopK(fetch=1), expr=[c3@2 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c2@1 = 3 AND c1@0 = a",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[<PATH>]]}, projection=[c1, c2, c3], has_header=true",
I think the second plan generates a deterministic result. However, the query (dataframe query) is not deterministic as is.
With this observation, I have updated the place of the limit to make sure the query is deterministic after execution. With the change of the place of the limit, the following plan is generated:
"ProjectionExec: expr=[c1@0 as one, c2@1 as two, c3@2 as c3, CAST(c2@1 AS Int8) + c3@2 as total]",
" GlobalLimitExec: skip=0, fetch=1",
" SortPreservingMergeExec: [c1@0 ASC,c2@1 ASC,c3@2 ASC], fetch=1",
" SortExec: TopK(fetch=1), expr=[c3@2 ASC], preserve_partitioning=[true]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c2@1 = 3 AND c1@0 = a",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[<PATH>]]}, projection=[c1, c2, c3], has_header=true",
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it also makes sense that the previous test did a sort
right after a select + filter which will not produce a deterministic result. Doing the limit after the sort makes sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
05)--------ProjectionExec: expr=[] | ||
06)----------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[] | ||
07)------------CoalesceBatchesExec: target_batch_size=4096 | ||
08)--------------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=2 | ||
09)----------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[] | ||
10)------------------ProjectionExec: expr=[c1@0 as c1] | ||
11)--------------------CoalesceBatchesExec: target_batch_size=4096 | ||
12)----------------------FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434 | ||
13)------------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 | ||
14)--------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], has_header=true | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A better plan 🚀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking very good!
For other reviewers: The optimizer removes the RR in some SLT tests because we estimate to have a single batch (the RR would be pointless). We are getting very smart 🚀
@alamb it would be great if you could take a look
Will put it on my list for today |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mustafasrepo and @ozankabak -- I went through this PR carefully and I think it looks good to me.
I had some improvement suggestions but I don't think any are ncessary prior to merge
@@ -3019,11 +3019,11 @@ mod tests { | |||
|
|||
assert_batches_sorted_eq!( | |||
[ | |||
"+-----+-----+----+-------+", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it also makes sense that the previous test did a sort
right after a select + filter which will not produce a deterministic result. Doing the limit after the sort makes sense
Thanks for the review @alamb -- I will send one more commit and then this will be good to go. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After a careful study of the code, I have one issue in mind (for which I left an inline comment). We can merge the code after we make sure the plan change in question is not due to a regression.
05)--------MemoryExec: partitions=4 | ||
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | ||
05)--------AggregateExec: mode=Partial, gby=[i@0 as i], aggr=[] | ||
06)----------MemoryExec: partitions=1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only thing I don't understand here. I studied the rule logic but it is not clear to me why we don't use source output multi-partitioning but a RR later on.
Once we are sure this is not due to some regression, we can merge this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I figured out what is going on here. With the optimizations we now do, the CREATE TABLE ... SELECT ...
query doesn't create a multi-partition table (because it is not helpful). Therefore we see the RR in the downstream test. Reducing the batch size just before the test gives us the old plan. I updated the comment above accordingly.
So all is good, ready to go.
…artition based on row count estimates (apache#11875)" This reverts commit 79fa6f9.
Which issue does this PR close?
Closes #.
Rationale for this change
What changes are included in this PR?
Recently @alihandroid added limit pushdown support for physical plan in the PR. After this PR, I recognized that
EnforceSorting
rule has some problems in handling operators with fetch. It sometimes loses fetch number during sort pushdown (SinceLimitPushdown
rule is afterEnforceSorting
we do not hit erroneous cases currently.). Hence, I used unit tests to trigger erroneous handlings.Are these changes tested?
Yes, unit tests are added.
Are there any user-facing changes?